1 /**
2 * Copyright 2014 Netflix, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package rx.internal.util;
17
18 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19
20 import rx.Subscription;
21 import rx.functions.Func1;
22
23 /**
24 * Similar to CompositeSubscription but giving extra access to internals so we can reuse a datastructure.
25 * <p>
26 * NOTE: This purposefully is leaking the internal data structure through the API for efficiency reasons to avoid extra object allocations.
27 */
28 public final class SubscriptionIndexedRingBuffer<T extends Subscription> implements Subscription {
29
30 private volatile IndexedRingBuffer<T> subscriptions = IndexedRingBuffer.getInstance();
31 private volatile int unsubscribed = 0;
32 @SuppressWarnings("rawtypes")
33 private final static AtomicIntegerFieldUpdater<SubscriptionIndexedRingBuffer> UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(SubscriptionIndexedRingBuffer.class, "unsubscribed");
34
35 public SubscriptionIndexedRingBuffer() {
36 }
37
38 @Override
39 public boolean isUnsubscribed() {
40 return unsubscribed == 1;
41 }
42
43 /**
44 * Adds a new {@link Subscription} to this {@code CompositeSubscription} if the {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} <em>is</em>
45 * unsubscribed, {@code add} will indicate this by explicitly unsubscribing the new {@code Subscription} as
46 * well.
47 *
48 * @param s
49 * the {@link Subscription} to add
50 *
51 * @return int index that can be used to remove a Subscription
52 */
53 public synchronized int add(final T s) {
54 // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420
55 if (unsubscribed == 1 || subscriptions == null) {
56 s.unsubscribe();
57 return -1;
58 } else {
59 int n = subscriptions.add(s);
60 // double check for race condition
61 if (unsubscribed == 1) {
62 s.unsubscribe();
63 }
64 return n;
65 }
66 }
67
68 /**
69 * Uses the Node received from `add` to remove this Subscription.
70 * <p>
71 * Unsubscribes the Subscription after removal
72 */
73 public void remove(final int n) {
74 if (unsubscribed == 1 || subscriptions == null || n < 0) {
75 return;
76 }
77 Subscription t = subscriptions.remove(n);
78 if (t != null) {
79 // if we removed successfully we then need to call unsubscribe on it
80 if (t != null) {
81 t.unsubscribe();
82 }
83 }
84 }
85
86 /**
87 * Uses the Node received from `add` to remove this Subscription.
88 * <p>
89 * Does not unsubscribe the Subscription after removal.
90 */
91 public void removeSilently(final int n) {
92 if (unsubscribed == 1 || subscriptions == null || n < 0) {
93 return;
94 }
95 subscriptions.remove(n);
96 }
97
98 @Override
99 public void unsubscribe() {
100 if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && subscriptions != null) {
101 // we will only get here once
102 unsubscribeFromAll(subscriptions);
103
104 IndexedRingBuffer<T> s = subscriptions;
105 subscriptions = null;
106 s.unsubscribe();
107 }
108 }
109
110 public int forEach(Func1<T, Boolean> action) {
111 return forEach(action, 0);
112 }
113
114 /**
115 *
116 * @param action
117 * @return int of last index seen if forEach exited early
118 */
119 public synchronized int forEach(Func1<T, Boolean> action, int startIndex) {
120 // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420
121 if (unsubscribed == 1 || subscriptions == null) {
122 return 0;
123 }
124 return subscriptions.forEach(action, startIndex);
125 }
126
127 private static void unsubscribeFromAll(IndexedRingBuffer<? extends Subscription> subscriptions) {
128 if (subscriptions == null) {
129 return;
130 }
131
132 // TODO migrate to drain (remove while we're doing this) so we don't have to immediately clear it in IndexedRingBuffer.releaseToPool?
133 subscriptions.forEach(UNSUBSCRIBE);
134 }
135
136 private final static Func1<Subscription, Boolean> UNSUBSCRIBE = new Func1<Subscription, Boolean>() {
137
138 @Override
139 public Boolean call(Subscription s) {
140 s.unsubscribe();
141 return Boolean.TRUE;
142 }
143 };
144
145 }